{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[Row(fixed acidity=7.4, volatile acidity=0.7, citric acid=0.0, residual sugar=1.9, chlorides=0.076, free sulfur dioxide=11.0, total sulfur dioxide=34.0, density=0.9978, pH=3.51, sulphates=0.56, alcohol=9.4, quality=5)]"
      ]
     },
     "execution_count": 14,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "from pyspark.context import SparkContext\n",
    "from pyspark.sql.session import SparkSession\n",
    "sc = SparkContext('local')\n",
    "spark = SparkSession(sc)\n",
    "\n",
    "wine_df = spark.read.csv('winequality-red.csv', header = True, inferSchema=True, sep =';')\n",
    "wine_df.take(1)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------+------------------+------------------+\n",
      "|summary|           alcohol|           quality|\n",
      "+-------+------------------+------------------+\n",
      "|  count|              1599|              1599|\n",
      "|   mean|10.422983114446502|5.6360225140712945|\n",
      "| stddev|1.0656675818473935|0.8075694397347051|\n",
      "|    min|               8.4|                 3|\n",
      "|    max|              14.9|                 8|\n",
      "+-------+------------------+------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "wine_df.select(\"alcohol\",\"quality\").describe().show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Correlation to MPG for  fixed acidity 0.12405164911322263\n",
      "Correlation to MPG for  volatile acidity -0.3905577802640061\n",
      "Correlation to MPG for  citric acid 0.22637251431804048\n",
      "Correlation to MPG for  residual sugar 0.013731637340065798\n",
      "Correlation to MPG for  chlorides -0.12890655993005293\n",
      "Correlation to MPG for  free sulfur dioxide -0.05065605724427597\n",
      "Correlation to MPG for  total sulfur dioxide -0.18510028892653774\n",
      "Correlation to MPG for  density -0.17491922778336474\n",
      "Correlation to MPG for  pH -0.0577313912053826\n",
      "Correlation to MPG for  sulphates 0.25139707906925995\n",
      "Correlation to MPG for  alcohol 0.4761663240011364\n",
      "Correlation to MPG for  quality 1.0\n"
     ]
    }
   ],
   "source": [
    "import six\n",
    "for i in wine_df.columns:\n",
    "    if not( isinstance(wine_df.select(i).take(1)[0][0], six.string_types)):\n",
    "        print( \"Correlation to MPG for \", i, wine_df.stat.corr('quality',i))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "metadata": {},
   "outputs": [],
   "source": [
    "wine_df = wine_df.drop(\"residual sugar\").drop(\"free sulfur dioxide\") \\\n",
    "                       .drop(\"pH\").drop(\"density\") \\\n",
    "                       .drop(\"chlorides\").drop('fixed acidity')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[Row(volatile acidity=0.7, citric acid=0.0, total sulfur dioxide=34.0, sulphates=0.56, alcohol=9.4, quality=5, features=DenseVector([0.7, 0.0, 34.0, 0.56, 9.4]))]"
      ]
     },
     "execution_count": 20,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "from pyspark.ml.feature import VectorAssembler\n",
    "\n",
    "vectorAssembler = VectorAssembler(inputCols = ['volatile acidity', 'citric acid', 'total sulfur dioxide', 'sulphates', 'alcohol'], outputCol = 'features')\n",
    "vwine_df = vectorAssembler.transform(wine_df)\n",
    "vwine_df.take(1)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Linear Regression"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 23,
   "metadata": {},
   "outputs": [],
   "source": [
    "splits = vwine_df.randomSplit([0.7, 0.3])\n",
    "train_df = splits[0]\n",
    "test_df = splits[1]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 24,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Coefficients: [-1.134803300789282,-0.011468089424519127,-0.0024666032938323525,0.8259746568202124,0.29643567763999223]\n",
      "Intercept: 2.7325981169381355\n"
     ]
    }
   ],
   "source": [
    "from pyspark.ml.regression import LinearRegression\n",
    "\n",
    "lr = LinearRegression(featuresCol = 'features', labelCol='quality', maxIter=10)\n",
    "lr_model = lr.fit(train_df)\n",
    "print(\"Coefficients: \" + str(lr_model.coefficients))\n",
    "print(\"Intercept: \" + str(lr_model.intercept))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 26,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+------------------+-------+--------------------+\n",
      "|        prediction|quality|            features|\n",
      "+------------------+-------+--------------------+\n",
      "| 6.321948236211139|      6|[0.16,0.64,52.0,0...|\n",
      "| 6.383392525710061|      6|[0.18,0.37,109.0,...|\n",
      "| 6.260948682710943|      6|[0.18,0.51,23.0,0...|\n",
      "| 6.260948682710943|      6|[0.18,0.51,23.0,0...|\n",
      "| 5.361153532922475|      5|[0.19,0.21,135.0,...|\n",
      "| 6.328172896188763|      6|[0.19,0.42,30.0,0...|\n",
      "| 6.183009588200807|      6|[0.22,0.24,28.0,0...|\n",
      "| 6.183009588200807|      6|[0.22,0.24,28.0,0...|\n",
      "| 5.553702448141423|      6|[0.22,0.48,60.0,0...|\n",
      "|5.5401318299853965|      4|[0.23,0.37,36.0,0...|\n",
      "| 6.226681409372491|      7|[0.24,0.35,27.0,0...|\n",
      "| 6.469942162899238|      7|[0.24,0.42,22.0,1...|\n",
      "| 6.447170802911483|      7|[0.24,0.46,21.0,1...|\n",
      "| 6.541619485323386|      6|[0.24,0.49,20.0,1...|\n",
      "| 6.711646680769182|      7|[0.25,0.39,10.0,0...|\n",
      "| 6.167801780409271|      6|[0.25,0.46,42.0,0...|\n",
      "| 5.813409467513248|      6|[0.26,0.42,27.0,0...|\n",
      "| 5.603333877120943|      5|[0.26,0.45,49.0,0...|\n",
      "|  6.35712538058033|      6|[0.26,0.48,10.0,0...|\n",
      "|  6.35712538058033|      6|[0.26,0.48,10.0,0...|\n",
      "+------------------+-------+--------------------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "predictions = lr_model.transform(test_df)\n",
    "predictions.select(\"prediction\",\"quality\",\"features\").show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 27,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "0.3359148546269666"
      ]
     },
     "execution_count": 27,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "#Find R2 for Linear Regression\n",
    "from pyspark.ml.evaluation import RegressionEvaluator\n",
    "evaluator = RegressionEvaluator(predictionCol=\"prediction\", \\\n",
    "                 labelCol=\"quality\",metricName=\"r2\")\n",
    "evaluator.evaluate(predictions)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 28,
   "metadata": {},
   "outputs": [],
   "source": [
    "sc.stop()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "conda_tensorflow_p36",
   "language": "python",
   "name": "conda_tensorflow_p36"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.4"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
